'''
# author: Zhiyuan Yan
# email: zhiyuanyan@link.cuhk.edu.cn
# date: 2023-0706
# description: Class for the RFMDetector

Functions in the Class are summarized as:
1. __init__: Initialization
2. build_backbone: Backbone-building
3. build_loss: Loss-function-building
4. features: Feature-extraction
5. classifier: Classification
6. get_losses: Loss-computation
7. get_train_metrics: Training-metrics-computation
8. get_test_metrics: Testing-metrics-computation
9. forward: Forward-propagation

Reference:
@inproceedings{wang2021representative,
  title={Representative forgery mining for fake face detection},
  author={Wang, Chengrui and Deng, Weihong},
  booktitle={Proceedings of the IEEE/CVF conference on computer vision and pattern recognition},
  pages={14923--14932},
  year={2021}
}
'''

import os
import datetime
import logging
import numpy as np
from sklearn import metrics
from typing import Union
from collections import defaultdict
import random

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.nn import DataParallel
from torch.utils.tensorboard import SummaryWriter

from metrics.base_metrics_class import calculate_metrics_for_train

from .base_detector import AbstractDetector
from detectors import DETECTOR
from networks import BACKBONE
from loss import LOSSFUNC


logger = logging.getLogger(__name__)

@DETECTOR.register_module(module_name='rfm')
class RFMDetector(AbstractDetector):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.backbone = self.build_backbone(config)
        self.loss_func = self.build_loss(config)

    def build_backbone(self, config):
        # prepare the backbone
        backbone_class = BACKBONE[config['backbone_name']]
        model_config = config['backbone_config']
        backbone = backbone_class(model_config)
        # if donot load the pretrained weights, fail to get good results
        state_dict = torch.load(config['pretrained'])
        for name, weights in state_dict.items():
            if 'pointwise' in name:
                state_dict[name] = weights.unsqueeze(-1).unsqueeze(-1)
        state_dict = {k:v for k, v in state_dict.items() if 'fc' not in k}
        backbone.load_state_dict(state_dict, False)
        logger.info('Load pretrained model successfully!')
        return backbone

    def build_loss(self, config):
        # prepare the loss function
        loss_class = LOSSFUNC[config['loss_func']]
        loss_func = loss_class()
        return loss_func
    
    def features(self, data_dict: dict) -> torch.tensor:
        return self.backbone.features(data_dict['image']) #32,3,256,256

    def classifier(self, features: torch.tensor) -> torch.tensor:
        return self.backbone.classifier(features)
    
    def get_losses(self, data_dict: dict, pred_dict: dict) -> dict:
        label = data_dict['label']
        pred = pred_dict['cls']
        loss = self.loss_func(pred, label)
        loss_dict = {'overall': loss}
        return loss_dict
    
    def get_train_metrics(self, data_dict: dict, pred_dict: dict) -> dict:
        label = data_dict['label']
        pred = pred_dict['cls']
        # compute metrics for batch data
        auc, eer, acc, ap = calculate_metrics_for_train(label.detach(), pred.detach())
        metric_batch_dict = {'acc': acc, 'auc': auc, 'eer': eer, 'ap': ap}
        return metric_batch_dict
    
    def cal_fam(self, inputs):
        self.backbone.zero_grad()
        inputs = inputs.detach().clone()
        inputs.requires_grad_()
        _, output = self.backbone(inputs)
        target = output[:, 1]-output[:, 0]
        target.backward(torch.ones(target.shape).cuda())
        fam = torch.abs(inputs.grad)
        fam = torch.max(fam, dim=1, keepdim=True)[0]
        return fam

    def apply_rfm_augmentation(self, data):
        device = data.device
        self.backbone.eval()

        # 直接调用 self.cal_fam 而非 cal_fam
        mask = self.cal_fam(data)
        imgmask = torch.ones_like(mask)
        imgh, imgw = 256, 256
        
        # Apply the mask based on FAM
        for i in range(len(mask)):
            maxind = np.argsort(mask[i].cpu().numpy().flatten())[::-1]
            pointcnt = 0
            for pointind in maxind:
                pointx = pointind // imgw
                pointy = pointind % imgw

                if imgmask[i][0][pointx][pointy] == 1:
                    eH, eW = 120, 120
                    maskh = random.randint(1, eH)
                    maskw = random.randint(1, eW)

                    sh = random.randint(1, maskh)
                    sw = random.randint(1, maskw)

                    top = max(pointx - sh, 0)
                    bot = min(pointx + (maskh - sh), imgh)
                    lef = max(pointy - sw, 0)
                    rig = min(pointy + (maskw - sw), imgw)

                    imgmask[i][:, top:bot, lef:rig] = torch.zeros_like(imgmask[i][:, top:bot, lef:rig])

                    pointcnt += 1
                    if pointcnt >= 3:
                        break
                    
        # Apply the masked data
        data = imgmask * data + (1 - imgmask) * (torch.rand_like(data) * 2 - 1)
        
        self.backbone.train()
        
        return data


    def forward(self, data_dict: dict, inference=False) -> dict:
        if not inference:
            # 非推理阶段，应用 RFM 增强
            data_dict['image'] = self.apply_rfm_augmentation(data_dict['image'])
        

        # get the features by backbone
        features = self.features(data_dict)
        # get the prediction by classifier
        pred = self.classifier(features)
        # get the probability of the pred
        prob = torch.softmax(pred, dim=1)[:, 1]
        # build the prediction dict for each output
        pred_dict = {'cls': pred, 'prob': prob, 'feat': features}

        return pred_dict
